In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MLApp2").getOrCreate()
from pyspark.ml.linalg import Vectors
import pandas as pd
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
In [2]:
sc = spark.sparkContext 
In [3]:
!ls
GlobalLandTemperaturesByCity.1000000.csv	   elevation-getter.py
GlobalLandTemperaturesByCity.csv		   elevationsByCity.1000000.csv
climate-change-earth-surface-temperature-data.zip  elevationsByCity.csv
eda-1000000_v2.ipynb				   elevationsByCity.csv.tar.gz
eda-1000000_v3-Copy1.ipynb			   spark-warehouse
eda-1000000_v3.ipynb				   temp-plot.html
In [4]:
from os import listdir
from os.path import isfile, join
listdir = [f for f in listdir("/home")]
print(listdir)
['.ipynb_checkpoints', 'GlobalLandTemperaturesByCity.csv', 'eda-1000000_v3-Copy1.ipynb', 'elevationsByCity.1000000.csv', 'elevation-getter.py', 'elevationsByCity.csv', 'spark-warehouse', 'GlobalLandTemperaturesByCity.1000000.csv', 'elevationsByCity.csv.tar.gz', 'eda-1000000_v2.ipynb', 'eda-1000000_v3.ipynb', 'climate-change-earth-surface-temperature-data.zip', 'temp-plot.html']
In [5]:
dtf1 = spark.read.csv('hdfs:///datasets/GlobalLandTemperaturesByCity.1000000.csv',
                       inferSchema=True, 
                       header=True)
In [6]:
dtf1.show(2)
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 2 rows

In [7]:
type(dtf1)
Out[7]:
pyspark.sql.dataframe.DataFrame
In [8]:
print(dtf1.count())
999999

EDA

Dataset Temperatura Promedio por ciudades

Este dataset contiene registros de temperatura promedio por día en ciudades del mundo, también tiene datos de de psocionamiento geográficos como latitud y longitud.

Se empieza analizando la ausencia de valores, haciendo ajustes en los tipos de datos esto por que algunas operaciones posteriores requieren que las variables estén en un tipo de dato estándar de spark.

In [9]:
dtf1.printSchema()
root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [10]:
# Ciudades
dtf1.select(['City']).describe()
Out[10]:
DataFrame[summary: string, City: string]
In [11]:
# Ciudades distintas
dtf1.createOrReplaceTempView('dtf_temp')

query = 'SELECT count(DISTINCT City) FROM dtf_temp'
djtf = spark.sql(query)
djtf.show()
+--------------------+
|count(DISTINCT City)|
+--------------------+
|                 396|
+--------------------+

In [12]:
dtf1.select("City").distinct().explain()
== Physical Plan ==
*(2) HashAggregate(keys=[City#13], functions=[])
+- Exchange hashpartitioning(City#13, 200)
   +- *(1) HashAggregate(keys=[City#13], functions=[])
      +- *(1) FileScan csv [City#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://mycluster-master:9000/datasets/GlobalLandTemperaturesByCity.1000000.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<City:string>
In [13]:
dtf1.select("City").distinct().count()
Out[13]:
396
In [14]:
dtf1.describe('AverageTemperature').show()
+-------+------------------+
|summary|AverageTemperature|
+-------+------------------+
|  count|            955706|
|   mean|17.934368434434784|
| stddev|10.369847835153674|
|    min|           -31.312|
|    max| 39.15600000000001|
+-------+------------------+

In [15]:
# Paises
query = 'SELECT count(DISTINCT Country) FROM dtf_temp'
djtf = spark.sql(query)
djtf.explain()
djtf.show()
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(distinct Country#14)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(distinct Country#14)])
      +- *(2) HashAggregate(keys=[Country#14], functions=[])
         +- Exchange hashpartitioning(Country#14, 200)
            +- *(1) HashAggregate(keys=[Country#14], functions=[])
               +- *(1) FileScan csv [Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://mycluster-master:9000/datasets/GlobalLandTemperaturesByCity.1000000.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Country:string>
+-----------------------+
|count(DISTINCT Country)|
+-----------------------+
|                     83|
+-----------------------+

In [16]:
# Comprobando nulos en AverageTemperature
from pyspark.sql.functions import isnan, when, count, col
dtf1.where(col('AverageTemperature').isNull()).count()
Out[16]:
44293
In [17]:
import pyspark.sql.functions as f
dtf1.where(col('AverageTemperature').isNull() & (col('Country') == 'Denmark')).show()
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-08-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-05-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-06-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-07-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-08-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-09-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-10-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-11-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1745-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-04-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-05-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-06-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1746-07-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 20 rows

Como existen varios registros con ausencia de valor y existe un dataset con las elevaciones por latitud y longitud que tiene la misma cantidad de filas, para evitar inconsistencias se unirán ambos datasets y posteriormente se procederá a hacer el tratamiento de valores ausentes.

In [18]:
# convirtiendo a cadena la columna timestamps
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.sql.functions import udf

# registrando UDF para conversion
def timestampsToString(dt):
    # print('>>', dt, type(dt))
    return str(dt) + ''

timestampToString_udf = udf(lambda z: timestampsToString(z), StringType())
In [19]:
dtf1.select('dt', timestampToString_udf('dt').alias('dt_str')).show()
+-------------------+-------------------+
|                 dt|             dt_str|
+-------------------+-------------------+
|1743-11-01 00:00:00|1743-11-01 00:00:00|
|1743-12-01 00:00:00|1743-12-01 00:00:00|
|1744-01-01 00:00:00|1744-01-01 00:00:00|
|1744-02-01 00:00:00|1744-02-01 00:00:00|
|1744-03-01 00:00:00|1744-03-01 00:00:00|
|1744-04-01 00:00:00|1744-04-01 00:00:00|
|1744-05-01 00:00:00|1744-05-01 00:00:00|
|1744-06-01 00:00:00|1744-06-01 00:00:00|
|1744-07-01 00:00:00|1744-07-01 00:00:00|
|1744-08-01 00:00:00|1744-08-01 00:00:00|
|1744-09-01 00:00:00|1744-09-01 00:00:00|
|1744-10-01 00:00:00|1744-10-01 00:00:00|
|1744-11-01 00:00:00|1744-11-01 00:00:00|
|1744-12-01 00:00:00|1744-12-01 00:00:00|
|1745-01-01 00:00:00|1745-01-01 00:00:00|
|1745-02-01 00:00:00|1745-02-01 00:00:00|
|1745-03-01 00:00:00|1745-03-01 00:00:00|
|1745-04-01 00:00:00|1745-04-01 00:00:00|
|1745-05-01 00:00:00|1745-05-01 00:00:00|
|1745-06-01 00:00:00|1745-06-01 00:00:00|
+-------------------+-------------------+
only showing top 20 rows

In [20]:
# aplicando udf y reemplazando columna a tipo de dato String
dtf1 = dtf1.withColumn('dt', timestampToString_udf('dt'))
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

Dataset Elevaciones (metros sobre el nivel del mar)

El dataset 2 contiene datos de elevación (altitud sobre el nivel del mar) de los puntos cardinales de las ciudades del dataset 1.

Se ha extraído de la base de datos http://srtm.csi.cgiar.org usando la herramienta https://github.com/Jorl17/open-elevation con el script elevation-getter.py que consulta a un servicios web de una instancia de open-elevation, sen envían como parámetros los puntos cardinales (latitud, longitud) de cada registro del dataset 1 y se obtiene la elevación en metros sobre el nivel del mar.

In [21]:
## Cargando dataset elevationsByCity.csv
dtf2 = spark.read.csv('hdfs:///datasets/elevationsByCity.1000000.csv',
                       inferSchema=True, 
                       header=True)
In [22]:
dtf2.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [23]:
dtf2Len = dtf2.count()
print(dtf2Len)
999999
In [24]:
# valores marcados como erroneos
dtf2.filter(dtf2.Elevation == -5555).show()
+--------+---------+---------+
|Latitude|Longitude|Elevation|
+--------+---------+---------+
|  21.70N|   77.02E|    -5555|
|  36.17N|  139.23E|    -5555|
+--------+---------+---------+

In [25]:
dtf2.select('Longitude').distinct().count()
Out[25]:
305
In [26]:
dtf2.select('Latitude').distinct().count()
Out[26]:
59
In [27]:
from pyspark.sql import Row

l = dtf2.head(dtf2Len)
elevations = []
for i, ro in enumerate(l):
    if int(ro[2]) == -5555:
        rowBefore = l[i-1]
        rowAfter = l[i+1]
        # rellenando con valor mas probable
        if rowBefore[2] == rowAfter[2]:
            #elevations.append(Row(Elevation=rowBefore[2]))
            #elevations.append((rowBefore[2],))
            elevations.append((ro[0], ro[1], rowBefore[2]))
        print(i, ro[2], '>>', rowBefore[2], rowAfter[2])
    else:
        elevations.append((ro[0], ro[1], ro[2]))
        #elevations.append(Row(Elevation=ro[2]))
        #elevations.append((ro[2],))
261318 -5555 >> 336 336
428672 -5555 >> 77 77
In [28]:
print(elevations[32010:32020])
[('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250), ('53.84N', '91.36E', 250)]
In [29]:
# actualizando
#schema = StructType([StructField('Elevation', IntegerType(), True)])
schema = StructType(
    [
        StructField('Latitude', StringType(), True),
        StructField('Longitude', StringType(), True),
        StructField('Elevation', IntegerType(), True)])

rdd = sc.parallelize(elevations)
print(rdd)
dfe = spark.createDataFrame(rdd, schema)
print("dfe.schema:", dfe.schema)
dfe.show(5)
ParallelCollectionRDD[117] at parallelize at PythonRDD.scala:195
dfe.schema: StructType(List(StructField(Latitude,StringType,true),StructField(Longitude,StringType,true),StructField(Elevation,IntegerType,true)))
+--------+---------+---------+
|Latitude|Longitude|Elevation|
+--------+---------+---------+
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
+--------+---------+---------+
only showing top 5 rows

In [30]:
dfe.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [31]:
dfe.count()
Out[31]:
999999
In [32]:
# Por Alguna razón esto devuelve error
#Py4JJavaError: An error occurred while calling o609.withColumn.
#: org.apache.spark.sql.AnalysisException: Resolved attribute(s) Elevation#269 missing from Latitude#246,Longitude#247,Elevation#248 in operator !Project [Latitude#246, Longitude#247, Elevation#269 AS Elevation#275]. Attribute(s) with the same name appear in the operation: Elevation. Please check if the right attribute(s) are used.;;
#!Project [Latitude#246, Longitude#247, Elevation#269 AS Elevation#275]
#+- Relation[Latitude#246,Longitude#247,Elevation#248] csv

#dtf2 = dtf2.withColumn("Elevation", df["Elevation"])
#from pyspark.sql.functions import col
#dtf2 = dtf2.withColumn("Elevation", df["Elevation"])
In [33]:
# comprobando que no hayan mas datos erroneos
dfe.filter(dfe.Elevation == -5555).count()
Out[33]:
0
In [34]:
dfe.filter(dfe.Elevation > 2500).explain()
== Physical Plan ==
*(1) Filter (isnotnull(Elevation#339) && (Elevation#339 > 2500))
+- Scan ExistingRDD[Latitude#337,Longitude#338,Elevation#339]
In [35]:
dfe.filter(dfe.Elevation > 2500).count()
Out[35]:
6395
In [36]:
# Comprobando ausencia de valores en latitud y longitud
print(dfe.columns)
print(dtf1.columns)

print("Nulls in dfe")
dfe.select([count(when(isnan(c), c)).alias(c) for c in ['Latitude', 'Longitude']]).show()
print("Nulls in dtf1")
dtf1.select([count(when(isnan(c), c)).alias(c) for c in ['Latitude', 'Longitude']]).show()
['Latitude', 'Longitude', 'Elevation']
['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude']
Nulls in dfe
+--------+---------+
|Latitude|Longitude|
+--------+---------+
|       0|        0|
+--------+---------+

Nulls in dtf1
+--------+---------+
|Latitude|Longitude|
+--------+---------+
|       0|        0|
+--------+---------+

In [37]:
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [38]:
len1 = dtf1.count()
len2 = dfe.count()
print(">>>>> dtf1" , len1)
print(">>>>> dfe", len2)
>>>>> dtf1 999999
>>>>> dfe 999999
In [39]:
dtf1.select([count(when(isnan(c), c)).alias(c) for c in ['AverageTemperature','AverageTemperatureUncertainty']]).show()
+------------------+-----------------------------+
|AverageTemperature|AverageTemperatureUncertainty|
+------------------+-----------------------------+
|                 0|                            0|
+------------------+-----------------------------+

In [40]:
# creando listas python para mezclar ambos datasets (se tienen problemas aplicando join)
l1 = dtf1.head(len1)
l2 = dfe.head(len2)
In [41]:
# Comprobando que latitud y longitud sean consistentes en ambos datasets
print(l1[2].asDict())
print(l2[2].asDict())

c = 0
d = 0
for i, t in enumerate(l1):
    d1 = l1[i].asDict()
    d2 = l2[i].asDict()
    if d1['Latitude'] != d2['Latitude'] or d1['Longitude'] != d2['Longitude']:
        print(l1[i])
        print("!",i,   d1['Latitude'], d2['Latitude'], "....", d1['Longitude'], d2['Longitude'])
        c += 1
    if d1['AverageTemperature'] == None:
        #print(i, ':', d1['AverageTemperature'])
        d = d + 1
print("diferentes", c)
print("AverageTemperature None", d)
{'dt': '1744-01-01 00:00:00', 'City': 'Århus', 'Latitude': '57.05N', 'Country': 'Denmark', 'AverageTemperature': None, 'Longitude': '10.33E', 'AverageTemperatureUncertainty': None}
{'Longitude': '10.33E', 'Latitude': '57.05N', 'Elevation': 8}
diferentes 0
AverageTemperature None 44293
In [42]:
# Como latitud y longitud de ambos dataframes son iguales entonces se pueden unir sin riesgo de inconsistencias
mergedList = []
for i, t in enumerate(l1):
    d1 = l1[i].asDict()
    d2 = l2[i].asDict()
    mergedList.append((d1['dt'], d1['AverageTemperature'], d1['AverageTemperatureUncertainty'], \
                       d1['City'], d1['Country'], d1['Latitude'], d1['Longitude'], d2['Elevation'] ))
print(len(mergedList))
999999
In [43]:
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [44]:
from pyspark.sql.types import IntegerType, StringType, DoubleType
# creando dataframe
schema = StructType([
    StructField('dt', StringType(), True),
    StructField('AverageTemperature', DoubleType(), True),
    StructField('AverageTemperatureUncertainty', DoubleType(), True),
    StructField('City', StringType(), True),
    StructField('Country', StringType(), True),
    StructField('Latitude', StringType(), True),
    StructField('Longitude', StringType(), True),
    StructField('Elevation', IntegerType(), True),
])
rdd = sc.parallelize(mergedList)
print(rdd)
ndf = spark.createDataFrame(rdd, schema)
print('ndf.schema', ndf.schema)
ndf.show(5)
ParallelCollectionRDD[187] at parallelize at PythonRDD.scala:195
ndf.schema StructType(List(StructField(dt,StringType,true),StructField(AverageTemperature,DoubleType,true),StructField(AverageTemperatureUncertainty,DoubleType,true),StructField(City,StringType,true),StructField(Country,StringType,true),StructField(Latitude,StringType,true),StructField(Longitude,StringType,true),StructField(Elevation,IntegerType,true)))
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|Elevation|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|        8|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|        8|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
only showing top 5 rows

In [45]:
ndf.count()
Out[45]:
999999
In [46]:
ndf.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

Tratamiento de ausencia de valores de temperatura

Para la columna AverageTemperature (temperatura promedio por día) haciendo una generalización y para evitar la estimaciones erróneas se ha optado por eliminar las filas con temperatura promedio sin valor. Esto por que la ausencia de valores se parece dar en una proporción pequeña de casos menor al 5%.

Por otro no es seguro intentar estimar o rellenar la ausencia de este dato reemplazando por un valor promedio ya que esto implicaría ver los casos particulares por ciudades y si existe una minoría de registros que no tengan el dato de temperatura promedio y esto podría representar mas trabajo a detalle que podría no tener relevancia en el resumen final de datos.

In [47]:
ndf.filter(ndf.AverageTemperature.isNull()).count()
Out[47]:
44293
In [48]:
# Eliminando las filas con dato de temperatura promedio 
ndf = ndf.filter(ndf.AverageTemperature.isNotNull())
In [49]:
ndf.show(5)
print('count:', ndf.count())
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|Elevation|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-04-01 00:00:00|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-05-01 00:00:00|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-06-01 00:00:00|14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-07-01 00:00:00|            16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|        8|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
only showing top 5 rows

count: 955706

La columna AverageTemperatureUncertainty indica un valor de incertidumbre para la temperatura promedio con un intervalo de confianza de 95%, en los casos en que no se tiene este valor en un registro (columna) se lo rellena con la mediana global a modo de generalizar el error.

In [50]:
ndf.select('AverageTemperatureUncertainty').describe().show()
+-------+-----------------------------+
|summary|AverageTemperatureUncertainty|
+-------+-----------------------------+
|  count|                       955706|
|   mean|           1.0334317509778077|
| stddev|            1.093998176200025|
|    min|         0.036000000000000004|
|    max|                        15.03|
+-------+-----------------------------+

In [51]:
# from pyspark.sql.functions import approxQuantile
med = ndf.approxQuantile("AverageTemperatureUncertainty", [0.5], 0.25)
print(med)
AverageTemperatureUncertainty_median = med[0]
[0.607]
In [52]:
# reemplazando ausencia de valores con la mediana
replaceNullWithMedian_udf = udf(lambda temp: AverageTemperatureUncertainty_median if temp is None else temp, DoubleType())
ndf = ndf.withColumn("AverageTemperatureUncertainty", replaceNullWithMedian_udf(ndf['AverageTemperatureUncertainty']))
ndf.show(5)
ndf.count()
#ndf.filter(ndf.AverageTemperatureUncertainty.isNull()).count()
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|Elevation|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-04-01 00:00:00|5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-05-01 00:00:00|            10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-06-01 00:00:00|14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|        8|
|1744-07-01 00:00:00|            16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|        8|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+---------+
only showing top 5 rows

Out[52]:
955706
In [53]:
ndf.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

Gráficas

En este punto el dataframe ndf ya está listo para hacer gráficas y hacer el análisis estadístico.

In [54]:
ndf_pd = ndf.toPandas()['AverageTemperature']
In [55]:
import plotly
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from plotly.offline import init_notebook_mode, iplot

init_notebook_mode(connected=True)         # initiate notebook for offline plot

Distribución de temperatura promedio

In [56]:
data = [go.Histogram(x=ndf_pd)]

plotly.offline.iplot({
    "data": [go.Histogram(x = ndf_pd)],
    "layout": go.Layout(title="Distribución de temperatura Promedio")
})